Skip to main content

LakeSoul's Supports for Python and Machine Learning

LakeSoul implements interfaces for PyTorch/PyArrow/HuggingFace/Ray, allowing users to retrieve datasets from LakeSoul tables through the interfaces. Distributed reading is supported for both PyTorch and Ray. LakeSoul for Python has now released 1.0 Beta.

Install

Download LakeSoul wheel file

For users of Python 3.8, Python 3.9, and Python 3.10, we have prepared different wheel files for each version. Please download the appropriate one based on your requirements. We will publish official package to pypi.org in near future.

The Python package currently only supports Linux systems and can be used on distros with GLibc 2.17 and above (Centos 7 and above, Ubuntu 16.04 and above, etc.).

Assuming we are using Python 3.8, we can down load the wheel file as below

wget https://dmetasoul-bucket.obs.cn-southwest-2.myhuaweicloud.com/releases/lakesoul/python/v1.0/lakesoul-1.0.0b0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl

Install python virtual enviroment

We have provide several AI training examples using LakeSoul as data source. Follow the below instructions to setup a testing environment.

# change python version if needed
conda create -n lakesoul_test python=3.8
conda acitvate lakesoul_test
git clone https://github.com/lakesoul-io/LakeSoul.git
cd LakeSoul/python/examples
# replace ${PWD} with your wheel file directory in requirements.txt
pip install -r requirements.txt

PyTorch API Usage

LakeSoul implements interfaces for PyTorch/HuggingFace, which allows users to directly export data from LakeSoul tables into HuggingFace datasets.

API for reading tables:

import datasets
import lakesoul.huggingface

dataset = datasets.IterableDataset.from_lakesoul("lakesoul_table", partitions={'split': 'train'})

You can create a PyTorch/HuggingFace dataset for training. The distributed training environment is automatically aware and no additional parameters are required when initializing the dataset.

Below is an example code that exports the feature-transformed Titanic dataset from LakeSoul and then trains and validates a DNN model using the dataset.

import argparse
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.init as init

import datasets
import lakesoul.huggingface

from torch.autograd import Variable

# hyper parameters
SEED = 0
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
batch_size = 50
num_epochs = 50
learning_rate = 0.01
weight_decay = 0.005

# label and feature columns
label_column = 'label'
feature_columns = 'f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13,f14,f15,f16,f17,f18,f19,f20,f21,f22,f23,f24,f25,f26'.split(
',')


class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.bn = nn.BatchNorm1d(26)
self.fc1 = nn.Linear(26, 256, bias=True)
self.fc2 = nn.Linear(256, 2, bias=True)
self._initialize_weights()

def forward(self, x):
x = self.bn(x)
x = self.fc1(x)
x = F.relu(x)
x = self.fc2(x)
x = torch.sigmoid(x)
return x

def _initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Linear):
init.xavier_uniform_(m.weight)
if m.bias is not None:
init.constant_(m.bias, 0)


def batchify(dataset, batch_size):
X_train = []
y_train = []
for i, item in enumerate(dataset):
feature_list = [item[feature] for feature in feature_columns]
X_train.append(feature_list)
y_train.append(int(item[label_column]))
if len(y_train) == batch_size:
yield X_train, y_train
X_train = []
y_train = []
# Handle the remaining records that don't fill up a full batch
if len(y_train) > 0:
yield X_train, y_train


def train_model(net, datasource, num_epochs, batch_size, learning_rate):
dataset = datasets.IterableDataset.from_lakesoul(datasource, partitions={'split': 'train'})

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(net.parameters(), lr=learning_rate, weight_decay=weight_decay)

for epoch in range(num_epochs):
if epoch % 5 == 0:
print('Epoch {}'.format(epoch + 1))
for X_train, y_train in batchify(dataset, batch_size):
x_var = Variable(torch.FloatTensor(X_train))
y_var = Variable(torch.LongTensor(y_train))
optimizer.zero_grad()
ypred_var = net(x_var)
loss = criterion(ypred_var, y_var)
loss.backward()
optimizer.step()


def evaluate_model(net, datasource, batch_size):
dataset = datasets.IterableDataset.from_lakesoul(datasource, partitions={'split': 'val'})
num_samples = 0
num_correct = 0

for X_val, y_val in batchify(dataset, batch_size):
batch_size = len(y_val)
test_var = Variable(torch.FloatTensor(X_val))
with torch.no_grad():
result = net(test_var)
values, labels = torch.max(result, 1)
num_right = np.sum(labels.data.numpy() == y_val)
num_samples += batch_size
num_correct += num_right

accuracy = num_correct / num_samples
print('Accuracy {:.2f}'.format(accuracy))


def main(table):
net = Net()
train_model(net, table, batch_size, num_epochs, learning_rate)
evaluate_model(net, table, batch_size)


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--table', type=str, default='titanic_trans', help='lakesoul table name')
args = parser.parse_args()

main(args.table)

More Examples at LakeSoul/python/examples

Ray DataSource

LakeSoul implements Ray's Datasource. The following is an example of calling code:

import ray.data
import lakesoul.ray
ds = ray.data.read_lakesoul("table_name", partitions={'split': 'train'})

PyArrow/Pandas Reads LakeSoul Table

LakeSoul can support single-machine reading of data and use PyArrow and Pandas for calculations. LakeSoul returns PyArrow's Dataset object when reading, supporting iterative access. Example:

from lakesoul.arrow import lakesoul_dataset

ds = lakesoul_dataset("table_name", partitions={'split': 'train'})

# iterate batches in dataset
# this will not load entire table to memory
for batch in ds.to_batches():
...

# convert to pandas table
# this will load entire table into memory
df = ds.to_table().to_pandas()